使用Canal和RocketMQ实现数据库变更订阅处理

本文以MySQL数据库为例介绍如何使用Canal接入云消息队列 RocketMQ 版,实现MySQL数据库Binlog数据的变更处理。

背景信息

CDC(Change Data Capture)是一种监测并捕获数据库变更的典型技术方案,常应用于异构数据源之间的数据同步。Canal作为一款轻量级的CDC工具,可基于数据库增量日志解析,提供增量变更数据的订阅和消费能力。Canal可以将变更记录可靠地投递到云消息队列 RocketMQ 版中,借助云消息队列 RocketMQ 版丰富的消息处理策略实现多样化的业务逻辑。

Canal是一个开源项目,仓库地址请参见Canal

应用场景

基于Binlog日志实现增量订阅和消费的典型业务场景如下:

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务Cache刷新

  • 带业务逻辑的增量数据处理

方案介绍

基于Canal云消息队列 RocketMQ 版CDC方案如下:

CDC方案

如上图所示,Canal将自己伪装成库,监听并接收数据库的Binlog,并同步到云消息队列 RocketMQ 版等存储或其他中间件系统。

具体操作步骤如下:

  1. 配置MySQL:开启MySQLBinlog功能,创建测试需要的数据库和表。

  2. 部署Canal:部署一个canal-deployer(server),监听并接收MySQL数据库的Binlog。

  3. 测试验证:验证数据变动后消息发送的情况。

环境要求

  • 资源要求

  • 网络要求

    • 部署canal-deployer(server)的节点可以连接数据库和云消息队列 RocketMQ 版实例,一般位于VPC内的ECS和容器都可以连接。

  • 版本要求

    服务

    版本

    说明

    Canal

    1.1.6

    其他版本请参见Canal Release

    MySQL

    8.0

    支持源端MySQL版本包括5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。

    云消息队列 RocketMQ 版

    5.x

    支持4.x5.x版本,推荐使用5.x版本实例。

    重要

    暂不支持5.x版本的Serverless实例。

1.配置MySQL

1.1 开启Binlog功能

阿里云RDS MySQL

阿里云RDS MySQL默认已开启Binlog功能,并且账号默认具有Binlog dump权限,可以直接跳过这一步。

自建MySQL

  • 开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

    [mysqld]
    log-bin=mysql-bin  # 开启binlog
    binlog-format=ROW  # 选择ROW模式
    server_id=1  # 配置MySQL replaction需要定义,不要和canalslaveId重复
  • 创建用户canal并授权MySQL slave 的权限。

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;

1.2 创建数据库

执行下面的SQL,创建一个名为canal的数据库。

CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

1.3 创建表

执行下面的SQL,在canal数据库创建一个名为students的表。

CREATE TABLE students (
    id INT AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL,
    age INT,
    gender VARCHAR(10),
    PRIMARY KEY (id)
);

2.部署Canal

2.1 安装JDK

执行下面的命令,安装JDK。

sudo yum install java-1.8.0-openjdk

2.2 下载canal-deployer

执行下面的命令,下载canal-deployer安装包。

sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

2.3 解压缩安装包

执行下面的命令,创建目录canal-server,并将下载的安装包解压缩到canal-server目录中。

# 创建目录canal-server
sudo mkdir -p /usr/local/canal-server

# 将下载的安装包解压缩到canal-server目录中
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server

2.4 修改配置

配置Canal启动的参数,详细配置请参见参数说明

  1. 执行下面的命令,配置canal.properties。

sudo vi /usr/local/canal-server/conf/canal.properties
# 服务端模式
canal.serverMode = rocketMQ
# 云消息队列 RocketMQ 版实例的用户名。在控制台访问控制页面智能身份识别中获取。
canal.aliyun.accessKey = 6W0xz2uPf******
# 云消息队列 RocketMQ 版实例的密码。在控制台访问控制页面智能身份识别中获取。
canal.aliyun.secretKey = sK56k1DrGx******
# 消息队列接入的方式
canal.mq.accessChannel = cloud
# 消息发送格式。云消息队列 RocketMQ 版不支持批量发送,canal.mq.flatMessage需要设置成false;消费端获取到的消息body后需反序列化body内容;Java语言可使用com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) 方法转化,其他语言需参照该方法自行实现。
canal.mq.flatMessage = false
# 云消息队列 RocketMQ 版实例中Group名
rocketmq.producer.group = canal_test
# 是否开启消息轨迹
rocketmq.enable.message.trace = false
# message trace的topic
rocketmq.customized.trace.topic = 
# 云消息队列 RocketMQ 版实例的命名空间。云消息队列 RocketMQ 版5.x实例无需填写该参数。
rocketmq.namespace =
# 云消息队列 RocketMQ 版实例的接入点。在云消息队列 RocketMQ 版控制台实例详情页面获取。
rocketmq.namesrv.addr = rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
# 重试次数
rocketmq.retry.times.when.send.failed = 0
# 是否启用VIP Netty通道发送消息
rocketmq.vip.channel.enabled = false
# 消息的tag配置
rocketmq.tag = 
  1. 执行下面的命令,配置instance.properties。

sudo vi /usr/local/canal-server/conf/example/instance.properties
# 阿里云RDS MySQL数据库的连接地址
canal.instance.master.address=rm-uf62****.rwlb.rds.aliyuncs.com:3306
# 阿里云RDS MySQL数据库的账号
canal.instance.dbUsername=xxx
# 阿里云RDS MySQL数据库的密码
canal.instance.dbPassword=xxx
# mysql 数据解析关注的表,Perl正则表达式,canal\\..*表示canal schema下所有表 
canal.instance.filter.regex=canal\\..*
# 云消息队列 RocketMQ 版实例的topic名称
canal.mq.topic=canal_topic

2.5 启动canal-deployer

执行下面的命令启动canal-deployer。

/usr/local/canal-server/bin/startup.sh

2.6 验证启动

  1. 执行下面的命令查看canal.log日志文件,确认Canal成功启动。

sudo vi /usr/local/canal-server/logs/canal/canal.log  
2024-07-15 17:24:12.154 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO  c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  1. 执行下面的命令查看example.log日志文件,确认Canal Instance成功启动。

sudo vi /usr/local/canal-server/logs/example/example.log 
2024-07-15 18:22:15.667 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-07-15 18:22:16.030 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

3.测试验证

3.1 向MYSQL数据库中添加数据

执行下面的SQL,向步骤1.3 创建表所创建的表students添加一条数据。

INSERT INTO`students` (`name`, `age`, `gender`)VALUES('Tome', 18, 'male');

3.2 查看Canal发送的消息

登录云消息队列 RocketMQ 版控制台,找到部署时配置的实例,在消息查询页面查看消息如下:

image

相关参考

Canal支持更多的高级功能设置,完整的参数设置请参见官网文档Canal快速入门